package com.example.dobs.demo.flink.io;

import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.util.Collector;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.Map;

public class MinuteBroadcastSource extends RichParallelSourceFunction<String> {


    private volatile boolean isRun=true;
    private volatile int lastUpdateMin = -1;
    private volatile Path path = new Path("file:///Users/msxr/develop/var/tmp/employee.txt");


    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRun) {
            LocalDateTime date = LocalDateTime.now();
            int min = date.getMinute();
            if (min != lastUpdateMin) {
                FileSystem fileSystem = path.getFileSystem();
                FSDataInputStream open = fileSystem.open(path);
                BufferedReader br = new BufferedReader(new InputStreamReader(open));
                String line;
                while ((line=br.readLine())!=null){
                    ctx.collect(line);
                }
            }
            Thread.sleep(1000);
        }
    }


    @Override
    public void cancel() {
        isRun = false;
    }

    public static void main(String[] args) {
        //1、定义一个MapStateDescriptor来描述我们要广播的数据的格式
        //状态可更新的、可遍历的、KV式的MapState
        final MapStateDescriptor<String, String> CONFIG_DESCRIPTOR = new MapStateDescriptor<>(
                "wordsConfig",
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO);
        //2. 实现了一个只有1个并发度的数据源Stream来广播下游的operator
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //3、添加数据源并把数据源注册成广播流
        BroadcastStream<String> broadcastStream = env.addSource(new MinuteBroadcastSource()).setParallelism(1).broadcast(CONFIG_DESCRIPTOR);
        //4、连接广播流和处理数据的流
        DataStreamSource<String> orderStream = env.socketTextStream("localhost", 9000);
        DataStream<String> connectedStream = orderStream.connect(broadcastStream).process(new BroadcastProcessFunction<String, String, String>() {
            @Override
            public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(CONFIG_DESCRIPTOR);
                Iterator<Map.Entry<String, String>> iterator = broadcastState.immutableEntries().iterator();
                while (iterator.hasNext()){
                    Map.Entry<String, String> entry =iterator.next();
                    System.out.println("all config:" + entry.getKey()  +  "   value:" + entry.getValue());
                }
                out.collect(value);
            }

            @Override
            public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                BroadcastState<String,String> state =  ctx.getBroadcastState(CONFIG_DESCRIPTOR);
                state.put("s","1");
                state.get("s");
            }
        });

    }
}

